package top.jfunc.common.thread.monitor.adapter;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import top.jfunc.common.thread.monitor.AutoChangeParamExecutorMonitorData;
import top.jfunc.common.thread.monitor.CountedRejectedExecutionHandler;
import top.jfunc.common.thread.monitor.StatisticsRunnable;
import top.jfunc.common.thread.monitor.change.ParamChangeBean;
import top.jfunc.common.thread.monitor.change.ParamChangeStrategy;
import top.jfunc.common.thread.monitor.change.ParamChangerUtil;
import top.jfunc.common.thread.monitor.change.ThreadPoolExecutorParamChangeAdapter;
import top.jfunc.common.thread.monitor.generator.CommonIdentifierGenerator;
import top.jfunc.common.thread.monitor.generator.IdentifierGenerator;

import java.util.Date;
import java.util.List;
import java.util.concurrent.*;

/**
 * 具备监控功能的线程池，在使用j.u.c {@link ThreadPoolExecutor}的地方替换为该类即可
 */
public class MonitoredThreadPoolExecutor extends ThreadPoolExecutor implements AutoChangeParamExecutorMonitorData {
    private static final Logger logger = LoggerFactory.getLogger(MonitoredThreadPoolExecutor.class);
    /**
     * 所属服务
     */
    private final String service;
    /**
     * 线程池名称
     */
    private final String poolName;
    /**
     * 线程池启动时间
     */
    private final Date startTime;
    /**
     * 唯一标识
     */
    private String identifier;
    /**
     * 任务执行统计器
     */
    private final MonitoredStatisticsCounter counter = new MonitoredStatisticsCounter();

    /**
     * 自动修改参数策略接口
     */
    private ParamChangeStrategy paramChangeStrategy;


    public MonitoredThreadPoolExecutor(int corePoolSize,
                                       int maximumPoolSize,
                                       long keepAliveTime,
                                       TimeUnit unit,
                                       BlockingQueue<Runnable> workQueue,
                                       String service,
                                       String poolName) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.service = service;
        this.poolName = poolName;
        this.startTime = new Date();
        this.identifier = new CommonIdentifierGenerator().getIdentifier(this);
        resetRejectedExecutionHandler();
    }

    public MonitoredThreadPoolExecutor(int corePoolSize,
                                       int maximumPoolSize,
                                       long keepAliveTime,
                                       TimeUnit unit,
                                       BlockingQueue<Runnable> workQueue,
                                       ThreadFactory threadFactory,
                                       String service,
                                       String poolName) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        this.service = service;
        this.poolName = poolName;
        this.startTime = new Date();
        this.identifier = new CommonIdentifierGenerator().getIdentifier(this);
        resetRejectedExecutionHandler();
    }

    public MonitoredThreadPoolExecutor(int corePoolSize,
                                       int maximumPoolSize,
                                       long keepAliveTime,
                                       TimeUnit unit,
                                       BlockingQueue<Runnable> workQueue,
                                       RejectedExecutionHandler handler,
                                       String service,
                                       String poolName) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        this.service = service;
        this.poolName = poolName;
        this.startTime = new Date();
        this.identifier = new CommonIdentifierGenerator().getIdentifier(this);
        resetRejectedExecutionHandler();
    }

    public MonitoredThreadPoolExecutor(int corePoolSize,
                                       int maximumPoolSize,
                                       long keepAliveTime,
                                       TimeUnit unit,
                                       BlockingQueue<Runnable> workQueue,
                                       ThreadFactory threadFactory,
                                       RejectedExecutionHandler handler,
                                       String service,
                                       String poolName) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        this.service = service;
        this.poolName = poolName;
        this.startTime = new Date();
        this.identifier = new CommonIdentifierGenerator().getIdentifier(this);
        resetRejectedExecutionHandler();
    }

    /**
     * 重置线程池的拒绝策略，可计数的
     */
    private void resetRejectedExecutionHandler() {
        CountedRejectedExecutionHandler handler = new CountedRejectedExecutionHandler(getRejectedExecutionHandler());
        setRejectedExecutionHandler(handler);
        this.counter.setRejectedExecutionHandler(handler);
    }

    @Override
    public void shutdown() {
        logger.info("{} Going to shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}", this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
        super.shutdown();
    }
    @Override
    public List<Runnable> shutdownNow() {
        logger.info("{} Going to immediately shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}", this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
        return super.shutdownNow();
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        counter.beforeExecute(t, r);
    }
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        counter.afterExecute(r, t);
    }

    @Override
    public void execute(Runnable command) {
        if(command instanceof StatisticsRunnable){
            super.execute(command);
            return;
        }
        super.execute(statisticsWrap(command));
    }

    protected Runnable statisticsWrap(Runnable command) {
        Runnable c = command;
        if(counter.getExecuteTimeout() > 0 || counter.getQueueTimeout() > 0){
            c = new StatisticsRunnable(command);
        }
        return c;
    }

    @Override
    public long getTotalExecuteTime() {
        return this.counter.getTotalExecuteTime();
    }
    @Override
    public long getMaxExecuteTime() {
        return this.counter.getMaxExecuteTime();
    }

    @Override
    public long getMinExecuteTime() {
        return this.counter.getMinExecuteTime();
    }

    @Override
    public int getExecuteTimeoutCount() {
        return this.counter.getExecuteTimeoutCount();
    }

    @Override
    public int getExecuteExceptionCount() {
        return this.counter.getExecuteExceptionCount();
    }

    @Override
    public long getMinQueueTime() {
        return this.counter.getMinQueueTime();
    }

    @Override
    public long getMaxQueueTime() {
        return this.counter.getMaxQueueTime();
    }

    @Override
    public int getQueueTimeoutCount() {
        return this.counter.getQueueTimeoutCount();
    }

    @Override
    public void resetStatistic() {
        this.counter.resetStatistic();
    }

    @Override
    public Date getStartTime(){
        return this.startTime;
    }
    @Override
    public String getPoolName() {
        return this.poolName;
    }

    @Override
    public String getService() {
        return this.service;
    }

    @Override
    public String getIdentifier() {
        return this.identifier;
    }


    public MonitoredThreadPoolExecutor setIdentifierGenerator(IdentifierGenerator identifierGenerator) {
        this.identifier = identifierGenerator.getIdentifier(this);
        return this;
    }

    public MonitoredThreadPoolExecutor setExecuteTimeout(long executeTimeout) {
        this.counter.setExecuteTimeout(executeTimeout);
        return this;
    }

    public long getExecuteTimeout() {
        return this.counter.getExecuteTimeout();
    }

    public MonitoredThreadPoolExecutor setQueueTimeout(long queueTimeout) {
        this.counter.setQueueTimeout(queueTimeout);
        return this;
    }

    public long getQueueTimeout() {
        return this.counter.getQueueTimeout();
    }

    @Override
    public int getQueueCapacity() {
        BlockingQueue<Runnable> blockingQueue = getQueue();
        return blockingQueue.remainingCapacity() + blockingQueue.size();
    }

    @Override
    public int getQueueSize() {
        return getQueue().size();
    }

    @Override
    public String getQueueClass() {
        return getQueue().getClass().getName();
    }

    @Override
    public String getThreadFactoryClass() {
        return getThreadFactory().getClass().getName();
    }

    @Override
    public String getRejectedExecutionHandlerClass() {
        return RejectedExecutionHandlerUtil.getOriginalRejectedExecutionHandlerClass(getRejectedExecutionHandler());
    }

    @Override
    public int getRejectedCount() {
        return RejectedExecutionHandlerUtil.getRejectedCount(getRejectedExecutionHandler());
    }

    @Override
    public void onChange(ParamChangeBean bean) {
        ParamChangerUtil.paramChange(new ThreadPoolExecutorParamChangeAdapter(this), bean);
    }

    public ParamChangeStrategy getParamChangeStrategy() {
        return paramChangeStrategy;
    }

    public MonitoredThreadPoolExecutor setParamChangeStrategy(ParamChangeStrategy paramChangeStrategy) {
        this.paramChangeStrategy = paramChangeStrategy;
        return this;
    }

    @Override
    public void doChange() {
        if(null != paramChangeStrategy){
            paramChangeStrategy.doChange(this);
        }
    }
}
